Config buffer
authorJeroen van der Heijden <jeroen@transceptor.technology>
Thu, 4 Oct 2018 20:58:08 +0000 (22:58 +0200)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Thu, 4 Oct 2018 20:58:08 +0000 (22:58 +0200)
13 files changed:
ChangeLog-2.0.30
include/siri/db/buffer.h
include/siri/db/series.h
itest/run_all.py
itest/siridb-random-data.py
itest/test_buffer.py [new file with mode: 0644]
itest/test_select.py
itest/test_series.py
itest/testing/server.py
itest/testing/testbase.py
src/siri/db/buffer.c
src/siri/db/insert.c
src/siri/db/series.c

index 0f6b6d1160fb27cb46582fa7f02e0776418e739c..dda9f7e3c6e0dc59842383d903a7e6c6a5e15e47 100644 (file)
@@ -10,4 +10,6 @@
   
   * Use posix_fadvise() on the buffer file. (@Svedrin)
   
-  * Refactor buffer and cleanup alternative buffer path.
\ No newline at end of file
+  * The buffer size can now be adjusted by using the database.conf 
+    configuration file.
+  
\ No newline at end of file
index db2b56dd30c2bdc335c733493ee96e609caff856..7f037d7432c7585ee38c2b7e24ca1a03ff82bec2 100644 (file)
@@ -34,9 +34,11 @@ int siridb_buffer_test_path(siridb_t * siridb);
 int siridb_buffer_write_empty(
         siridb_buffer_t * buffer,
         siridb_series_t * series);
-int siridb_buffer_write_last_point(
+int siridb_buffer_write_point(
         siridb_buffer_t * buffer,
-        siridb_series_t * series);
+        siridb_series_t * series,
+        uint64_t * ts,
+        qp_via_t * val);
 
 struct siridb_buffer_s
 {
index 64432c9692c844a319928da2a56d27544315a034..f75b1d862db883f5cc837f0f4afdeb932f4009b5 100644 (file)
@@ -116,6 +116,7 @@ siridb_points_t * siridb_series_get_first(
 siridb_points_t * siridb_series_get_last(
         siridb_series_t * series, int * required_shard);
 siridb_points_t * siridb_series_get_count(siridb_series_t * series);
+void siridb_series_ensure_type(siridb_series_t * series, qp_obj_t * qp_obj);
 /*
  * Increment the series reference counter.
  */
index 5778f982816d0933a6a3343ad76b614c69c36069..9ea0bfa7d21dad4f1d1236493866eb6c3c6cd037 100644 (file)
@@ -14,6 +14,7 @@ from test_compression import TestCompression
 from test_log import TestLog
 from test_log import TestLog
 from test_pipe_support import TestPipeSupport
+from test_buffer import TestBuffer
 
 
 Server.BUILDTYPE = 'Release'
@@ -31,3 +32,4 @@ if __name__ == '__main__':
     run_test(TestUser())
     run_test(TestLog())
     run_test(TestPipeSupport())
+    run_test(TestBuffer())
index 53e29e3aaf75e4e31a3ba58a344a3ee569b7d3f2..f4334d124d7a2b7292c34f0437baf819ecbc8dc2 100755 (executable)
@@ -77,7 +77,7 @@ class Series:
     _interval_range = None
     _r = None
 
-    def __init__(self, r, allowed_kinds=(int, float, str)):
+    def __init__(self, r, allowed_kinds=(int, float, str), wrong_type=False):
         self.kind = r.choice(allowed_kinds)
         self.lval = {
             str: '',
@@ -96,13 +96,13 @@ class Series:
         self.likely_equal = r.choice([0.01, 0.1, 0.2, 0.5, 0.99])
         self.likely_change_sign = r.choice([0.0, 0.1, 0.25, 0.5, 0.9])
 
-        self.as_int = self.kind == float and r.random() > 0.9
+        self.as_int = wrong_type and self.kind == float and r.random() > 0.9
         self.likely_inf = r.random() * 0.2 \
             if self.kind == float and r.random() > 0.95 else False
         self.likely_nan = r.random() * 0.2 \
             if self.kind == float and r.random() > 0.95 else False
 
-        self.gen_float = self.kind == int and r.random() > 0.97
+        self.gen_float = wrong_type and self.kind == int and r.random() > 0.97
 
         self.name = self._gen_name()
         Series._series.append(self)
@@ -166,7 +166,10 @@ class Series:
         kinds = [translate[k] for k in args.kinds]
 
         for i in range(args.num_series):
-            Series(r=series_rand, allowed_kinds=kinds)
+            Series(
+                r=series_rand,
+                allowed_kinds=kinds,
+                wrong_type=args.wrong_type)
 
     def _gen_name(self):
         name = '/n:{}/range:{},{}/eq:{}/cs:{}/opt:{}{}{}{}{}'.format(
@@ -409,6 +412,11 @@ if __name__ == '__main__':
         default=('int', 'float'),
         choices=('int', 'float'))  # , 'str'
 
+    parser.add_argument(
+        '--wrong-type',
+        action='store_true',
+        help='Allow series to insert points using a wrong type')
+
     parser.add_argument(
         '--max-parallel',
         type=int,
diff --git a/itest/test_buffer.py b/itest/test_buffer.py
new file mode 100644 (file)
index 0000000..28a8e84
--- /dev/null
@@ -0,0 +1,123 @@
+import os
+import asyncio
+import functools
+import random
+import time
+from testing import Client
+from testing import default_test_setup
+from testing import gen_data
+from testing import gen_points
+from testing import gen_series
+from testing import InsertError
+from testing import PoolError
+from testing import QueryError
+from testing import run_test
+from testing import Series
+from testing import Server
+from testing import ServerError
+from testing import SiriDB
+from testing import TestBase
+from testing import UserAuthError
+
+
+class TestBuffer(TestBase):
+    title = 'Test buffer object'
+
+    async def _add_points(self):
+        for series_name in ['iris', 'db', 'ligo', 'sasha']:
+            if series_name not in self.total:
+                self.total[series_name] = []
+            batches = sum([ord(c) for c in series_name]) % 100
+            for i in range(batches):
+                npoints = []
+                n = int(i**0.5 * 10000 % 5) + 1
+                for p in range(n):
+                    self.ts += (n + 5000) if i % 2 else (n - 5000)
+                    npoints.append([self.ts, i*1000+p])
+                self.total[series_name].extend(npoints)
+                self.total[series_name].sort()
+                await self.client0.insert({series_name: npoints})
+
+    async def _test_equal(self):
+        for series_name, points in self.total.items():
+            res = await self.client0.query(f'select * from "{series_name}"')
+            res = res[series_name]
+            self.assertEqual(len(points), len(res))
+            self.assertEqual(points, res)
+
+    async def _change_buf_size(self, buffer_size):
+        self.client0.close()
+        result = await self.server0.stop()
+        self.assertTrue(result)
+        self.server0.set_buffer_size(self.db, buffer_size)
+        await self.server0.start(sleep=5)
+        await self.client0.connect()
+        res = await self.client0.query('show buffer_size')
+        self.assertEqual(res['data'][0]['value'], buffer_size)
+        await self._test_equal()
+
+    async def _change_buf_path(self, buffer_path):
+        self.client0.close()
+        result = await self.server0.stop()
+        self.assertTrue(result)
+        self.server0.set_buffer_path(self.db, buffer_path)
+        await self.server0.start(sleep=5)
+        await self.client0.connect()
+        res = await self.client0.query('show buffer_path')
+        self.assertEqual(res['data'][0]['value'], buffer_path)
+        res = await self.client0.query('show open_files')
+        self.assertEqual(res['data'][0]['value'], 3)
+        res = await self.client0.query(
+            f'alter server {self.uuid} set backup_mode true')
+        await asyncio.sleep(5)
+        res = await self.client0.query('show open_files')
+        self.assertEqual(res['data'][0]['value'], 0)
+        res = await self.client0.query(
+            f'alter server {self.uuid} set backup_mode false')
+        await self._test_equal()
+
+    @default_test_setup(1, time_precision='s', compression=False)
+    async def run(self):
+        await self.client0.connect()
+
+        res = await self.client0.query('show uuid')
+        self.uuid = res['data'][0]['value']
+
+        self.ts = 1500000000
+        self.total = {}
+
+        await self._add_points()
+        await self._test_equal()
+
+        await self._change_buf_path(os.path.join(
+            self.server0.dbpath,
+            self.db.dbname,
+            '../buf/'))
+
+        await self._change_buf_size(8192)
+
+        await self._add_points()
+        await self._test_equal()
+
+        await self._change_buf_size(8192)
+        await self._change_buf_size(512)
+
+        await self._add_points()
+        await self._test_equal()
+
+        await self._change_buf_size(1024)
+
+        await self._change_buf_path(os.path.join(
+            self.server0.dbpath,
+            self.db.dbname,
+            'buf/'))
+
+        return False
+
+
+if __name__ == '__main__':
+    SiriDB.LOG_LEVEL = 'INFO'
+    Server.HOLD_TERM = True
+    Server.MEM_CHECK = True
+    Server.BUILDTYPE = 'Debug'
+    run_test(TestBuffer())
index 4031e4e05b39854be9b308e447160869cfcad4a4..1a9f5c615bab2970bd45abba29ab68422c6a335e 100644 (file)
@@ -87,7 +87,7 @@ DATA = {
 class TestSelect(TestBase):
     title = 'Test select and aggregate functions'
 
-    @default_test_setup(1, compression=False)
+    @default_test_setup(1, compression=False, buffer_size=1024)
     async def run(self):
         await self.client0.connect()
 
index 37b35945bd7e024bacdc07708bfe2c36026dd1c9..4ff585020aa075ce02a6d597ca9ef927d2a4eab9 100644 (file)
@@ -23,6 +23,48 @@ PI = 'ԉ'
 Klingon = '     ' + \
     'qajunpaQHeylIjmo’ batlh DuSuvqang charghwI’ ‘It.'
 
+data = {
+    'string': [
+        [1538660000, "some string value"],
+        [1538660010, -123456789],
+        [1538660020, -0.5],
+        [1538660030, 1/3],
+    ],
+    'integer': [
+        [1538660000, 1],
+        [1538660010, 35.6],
+        [1538660020, "-50,6%"],
+        [1538660030, ""],
+    ],
+    'double': [
+        [1538660000, 1.0],
+        [1538660010, -35],
+        [1538660010, "-50,6%"],
+        [1538660030, ""],
+    ]
+}
+
+expected = {
+    'string': [
+        [1538660000, "some string value"],
+        [1538660010, '-123456789'],
+        [1538660020, '-0,500000'],
+        [1538660030, '0,333333'],
+    ],
+    'integer': [
+        [1538660000, 1],
+        [1538660010, 35],
+        [1538660020, -50],
+        [1538660030, 0],
+    ],
+    'double': [
+        [1538660000, 1.0],
+        [1538660010, -35.0],
+        [1538660010, -50.6],
+        [1538660030, 0.0],
+    ]
+}
+
 
 class TestSeries(TestBase):
     title = 'Test series object'
@@ -47,6 +89,15 @@ class TestSeries(TestBase):
             await self.client0.query('select * from "{}"'.format(Klingon)),
             {Klingon: sorted(points)})
 
+        self.assertEqual(
+            await self.client0.insert(data),
+            {'success_msg': 'Successfully inserted 12 point(s).'})
+
+        self.assertAlmostEqual(
+            await self.client0.query(
+                'select * from "string", "integer", "double"'),
+            expected)
+
         self.client0.close()
 
         # return False
index c5d3b5085c0125d25bc6486e819e0c26203e157d..a629449e5adfaf47944baacc07353e34637f4041 100644 (file)
@@ -59,6 +59,8 @@ class Server:
         self.dbpath = os.path.join(TEST_DIR, 'dbpath{}'.format(self.n))
         self.name = 'SiriDB:{}'.format(self.listen_backend_port)
         self.pid = None
+        self.buffer_path = None
+        self.buffer_size = None
 
     @property
     def addr(self):
@@ -164,6 +166,35 @@ class Server:
         self.pid = None
         return True
 
+    def set_buffer_size(self, db, buffer_size):
+        self.buffer_size = buffer_size
+        config = configparser.RawConfigParser()
+        config.add_section('buffer')
+        if self.buffer_path is not None:
+            config.set('buffer', 'path', self.buffer_path)
+        config.set('buffer', 'size', self.buffer_size)
+        with open(os.path.join(
+                self.dbpath, db.dbname, 'database.conf'), 'w') as f:
+            config.write(f)
+
+    def set_buffer_path(self, db, buffer_path):
+        assert(buffer_path.endswith('/'))
+        curfile = os.path.join(self.dbpath, db.dbname, 'buffer.dat') \
+            if self.buffer_path is None else \
+            os.path.join(self.buffer_path, 'buffer.dat')
+        if not os.path.exists(buffer_path):
+            os.makedirs(buffer_path)
+        os.rename(curfile, os.path.join(buffer_path, 'buffer.dat'))
+        self.buffer_path = buffer_path
+        config = configparser.RawConfigParser()
+        config.add_section('buffer')
+        config.set('buffer', 'path', self.buffer_path)
+        if self.buffer_size is not None:
+            config.set('buffer', 'size', self.buffer_size)
+        with open(os.path.join(
+                self.dbpath, db.dbname, 'database.conf'), 'w') as f:
+            config.write(f)
+
     def kill(self):
         print("!!!!!!!!!!!! KILLL !!!!!!!!!!")
         os.system('kill -9 {}'.format(self.pid))
index 9efcbaf5be0535a8fe79f0b3f32314935ce8d6c9..d9a7204af343bdb21be2ca6e8c3892237397a995 100644 (file)
@@ -116,7 +116,9 @@ class TestBase(unittest.TestCase):
                     assert isinstance(point, list) and len(point) == 2, \
                         'Expecting a point to be a list of 2 items'
                     super().assertEqual(a[series][i][0], point[0])
-                    if math.isnan(a[series][i][1]):
+                    if isinstance(a[series][i][1], str):
+                        super().assertEqual(a[series][i][1], point[1])
+                    elif math.isnan(a[series][i][1]):
                         assert math.isnan(point[1]), \
                             'Expecting point `{}` to be `nan`, got: `{}`' \
                             .format(i, point[1])
index 2cf61269dd76e8f7035c182c64c98005910e7cd5..dc860bc3e882341502d7e25e558cf172d963f252 100644 (file)
@@ -152,20 +152,19 @@ int siridb_buffer_write_empty(
  *
  * Returns 0 if success or EOF in case of an error.
  */
-int siridb_buffer_write_last_point(
+int siridb_buffer_write_point(
         siridb_buffer_t * buffer,
-        siridb_series_t * series)
+        siridb_series_t * series,
+        uint64_t * ts,
+        qp_via_t * val)
 {
-    siridb_point_t * point;
     const size_t sz = sizeof(uint64_t) + sizeof(qp_via_t);
     char buf[sz];
-    int last_idx = series->buffer->len - 1;
-    assert (last_idx >= 0);
 
-    point = series->buffer->data + last_idx;
-
-    memcpy(buf, &point->ts, sizeof(uint64_t));
-    memcpy(buf + sizeof(uint64_t), &point->val, sizeof(qp_via_t));
+    ssize_t last_idx = series->buffer->len - 1;
+    assert (last_idx >= 0);
+    memcpy(buf, ts, sizeof(uint64_t));
+    memcpy(buf + sizeof(uint64_t), val, sizeof(qp_via_t));
 
     return (
         /* jump to position where to write the new point */
index 6ca4b23f4f840c79bd6540600e997f2739d65d9d..cd1f6b143897707c6a96a2513267e8e7113baee3 100644 (file)
@@ -553,6 +553,7 @@ static int8_t INSERT_local_work(
         ts = (uint64_t *) &qp_series_ts.via.int64;
         SERIES_UPDATE_TS(series)
 
+        siridb_series_ensure_type(series, &qp_series_val);
         if ((tp = qp_next(unpacker, qp_series_name)) != QP_ARRAY2 &&
                 series->buffer != NULL)
         {
@@ -584,10 +585,7 @@ static int8_t INSERT_local_work(
             if (series->tp == TP_STRING)
             {
                 val = &forstr;
-                val->str = qp_is_raw(qp_series_val.tp) ?
-                        strndup(qp_series_val.via.str, qp_series_val.len) :
-                        strdup("");
-
+                val->str = strndup(qp_series_val.via.str, qp_series_val.len);
                 if (val->str == NULL)
                 {
                     ERR_ALLOC
@@ -606,13 +604,12 @@ static int8_t INSERT_local_work(
             {
                 qp_next(unpacker, &qp_series_ts); /* ts     */
                 qp_next(unpacker, &qp_series_val); /* val   */
+                siridb_series_ensure_type(series, &qp_series_val);
 
                 if (series->tp == TP_STRING)
                 {
-                    val->str = qp_is_raw(qp_series_val.tp) ?
-                            strndup(qp_series_val.via.str, qp_series_val.len) :
-                            strdup("");
-
+                    val->str = \
+                            strndup(qp_series_val.via.str, qp_series_val.len);
                     if (val->str == NULL)
                     {
                         ERR_ALLOC
@@ -772,6 +769,8 @@ static int INSERT_local_work_test(
         ts = (uint64_t *) &qp_series_ts.via.int64;
         SERIES_UPDATE_TS(series)
 
+        siridb_series_ensure_type(series, &qp_series_val);
+
         if ((tp = qp_next(unpacker, qp_series_name)) != QP_ARRAY2 &&
                 series->buffer != NULL)
         {
@@ -803,10 +802,7 @@ static int INSERT_local_work_test(
             if (series->tp == TP_STRING)
             {
                 val = &forstr;
-                val->str = qp_is_raw(qp_series_val.tp) ?
-                        strndup(qp_series_val.via.str, qp_series_val.len) :
-                        strdup("");
-
+                val->str = strndup(qp_series_val.via.str, qp_series_val.len);
                 if (val->str == NULL)
                 {
                     ERR_ALLOC
@@ -824,13 +820,12 @@ static int INSERT_local_work_test(
             {
                 qp_next(unpacker, &qp_series_ts);   /*    ts    */
                 qp_next(unpacker, &qp_series_val);  /*    val   */
+                siridb_series_ensure_type(series, &qp_series_val);
 
                 if (series->tp == TP_STRING)
                 {
-                    val->str = qp_is_raw(qp_series_val.tp) ?
-                            strndup(qp_series_val.via.str, qp_series_val.len) :
-                            strdup("");
-
+                    val->str = \
+                            strndup(qp_series_val.via.str, qp_series_val.len);
                     if (val->str == NULL)
                     {
                         ERR_ALLOC
index 7c5adde1c9eacb7e53fe1fc62ee8ab2eed618142..3989581d8c10ed10e23950e73df9252ff31e4d75 100644 (file)
 #define BEND series->buffer->points->data[series->buffer->points->len - 1].ts
 #define DROPPED_DUMMY 1
 
+/*
+ * Used for storing double and integers as string. this is not very important
+ * if it will not store all characters generated so 64 is more than enough
+ */
+#define STR_TYPE_BUF_SZ 64
+static char str_type_buf[STR_TYPE_BUF_SZ];
+
 static int SERIES_save(siridb_t * siridb);
 static int SERIES_load(siridb_t * siridb, imap_t * dropped);
 static int SERIES_read_dropped(siridb_t * siridb, imap_t * dropped);
@@ -154,7 +161,7 @@ int siridb_series_add_point(
     }
     else
     {
-        if (siridb_buffer_write_last_point(siridb->buffer, series))
+        if (siridb_buffer_write_point(siridb->buffer, series, ts, val))
         {
             ERR_FILE
             log_critical("Cannot write new point to buffer");
@@ -935,6 +942,86 @@ siridb_points_t * siridb_series_get_count(siridb_series_t * series)
     return points;
 }
 
+void siridb_series_ensure_type(siridb_series_t * series, qp_obj_t * qp_obj)
+{
+    switch(series->tp)
+    {
+    case TP_INT:
+        if (qp_obj->tp != QP_INT64)
+        {
+            if (qp_obj->tp == QP_DOUBLE)
+            {
+                double d = qp_obj->via.real;
+                qp_obj->via.int64 = (int64_t) d;
+            }
+            else if (qp_obj->tp == QP_RAW)
+            {
+                char * s = strndup(qp_obj->via.str, qp_obj->len);
+                qp_obj->via.int64 = \
+                        (s == NULL) ? 0 : (int64_t) strtoll(s, NULL, 10);
+                free(s);
+            }
+            else
+            {
+                assert(0);
+            }
+            qp_obj->tp = QP_INT64;
+        }
+        return;
+    case TP_DOUBLE:
+        if (qp_obj->tp != QP_DOUBLE)
+        {
+            if (qp_obj->tp == QP_INT64)
+            {
+                int64_t i = qp_obj->via.int64;
+                qp_obj->via.real = (double) i;
+            }
+            else if (qp_obj->tp == QP_RAW)
+            {
+                char * s = strndup(qp_obj->via.str, qp_obj->len);
+                qp_obj->via.real = \
+                        (s == NULL) ? 0.0 : strtod(s, NULL);
+                free(s);
+            }
+            else
+            {
+                assert(0);
+            }
+            qp_obj->tp = TP_DOUBLE;
+        }
+        return;
+    case TP_STRING:
+        if (qp_obj->tp != QP_RAW)
+        {
+            if (qp_obj->tp == QP_INT64)
+            {
+                snprintf(
+                        str_type_buf,
+                        STR_TYPE_BUF_SZ,
+                        "%" PRId64,
+                        qp_obj->via.int64);
+                qp_obj->via.str = str_type_buf;
+            }
+            else if (qp_obj->tp == QP_DOUBLE)
+            {
+                snprintf(
+                        str_type_buf,
+                        STR_TYPE_BUF_SZ,
+                        "%f",
+                        qp_obj->via.real);
+                qp_obj->via.str = str_type_buf;
+            }
+            else
+            {
+                assert(0);
+            }
+            qp_obj->tp = QP_RAW;
+        }
+        return;
+    }
+    assert (0);
+}
+
 /*
  * Calculate the server id.
  * Returns 0 or 1, representing a server in a pool)
@@ -1466,6 +1553,7 @@ static int SERIES_load(siridb_t * siridb, imap_t * dropped)
     siridb_series_t * series;
     qp_types_t tp;
     uint32_t series_id;
+    uint8_t series_tp;
 
     /* we should not have any series at this moment */
     assert(siridb->max_series_id == 0);
@@ -1502,10 +1590,11 @@ static int SERIES_load(siridb_t * siridb, imap_t * dropped)
 
         if (imap_get(dropped, series_id) == NULL)
         {
+            series_tp = (uint8_t) qp_series_tp.via.int64;
             series = SERIES_new(
                     siridb,
                     series_id,
-                    (uint8_t) qp_series_tp.via.int64,
+                    series_tp,
                     siridb->server->pool,
                     (const char *) qp_series_name.via.raw);
             if (series != NULL)